#include "postgres.h"
#include "access/xlogrecord.h"
#include <signal.h>
#include <unistd.h>
#include <lmdb.h>
#include <pthread.h>
#include "postmaster/secondbuffer.h"
#include <stdio.h>
#include "utils/guc.h"

#include <stdbool.h>
#include "postmaster/interrupt.h"
#include "libpq/pqsignal.h"
#include "storage/s_lock.h"
#include "storage/spin.h"
#include "storage/shmem.h"
#include "storage/bufpage.h"
#include "storage/pmsignal.h"
#include "storage/lwlock.h"
#include "utils/memutils.h"
#include "storage/procsignal.h"
#include "utils/dynahash.h"
#include "miscadmin.h"
#include "utils/ps_status.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "utils/wait_event.h"
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "storage/bufmgr.h"
#include <sys/socket.h>
#include <sys/un.h>

const char *socketfile = "/tmp/he3cleanwal";
const char *p_socketfile = "/tmp/he3cleanpage";
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8))
#define SizeOfCleanPage 16

typedef struct SocketFd
{
    int walSocketFd;
    int pageSocketFd;
} SocketFd;

typedef struct SingleKeyArray
{
    SdPageKey SdPageKeyList[SDLEN];
    uint16 head;
    uint16 tail;
    uint16 unused;
    slock_t oplock;
} SingleKeyArray;

typedef struct Statisticnum
{
    double totalunused;
    slock_t change;
} Statisticnum;

typedef struct DPageKeyArray
{
    DPageKey dpk[1024];
    uint16 unused;
    uint16 head;
    uint16 tail;
    uint16 pageIndex;
    uint16 walIndex;
    slock_t append;
} DPageKeyArray;
/*
secondbufferhash code
*/
static int IsDirExist(const char *path);
static void CleanWalsByPage(WalLdPageKey *walkey);
static void CleanWalsByTable(WalLdPageKey *walkey);
static void CleanPagesByTable(LdPageKey *lpk);

static HTAB *SecondBufferHash = NULL;
extern bool EnableHotStandby;
SocketFd SocFd = {-1, -1};
DPageKeyArray *DPArray = NULL;

SingleKeyArray *MultiKeyArrays;

MDB_env *pageEnv = NULL;
MDB_env *walEnv = NULL;

MDB_dbi pageDbi;
MDB_dbi walDbi;

MDB_txn *pageTxn = NULL;
MDB_txn *walTxn = NULL;
MDB_cursor *cursor = NULL;

Statisticnum *statisticnum = NULL;

LWLockPadded *SecondBufferMainLWLockArray = NULL;

char *lmdb_page_directory;
char *lmdb_wal_directory;
Size SNBuffers = 1024;

Size SecondBufferShmemSize(void)
{
    Size size;
    size = mul_size(SNBuffers, BLKSZ);
    return size;
}

Size SecondBufferLWLockShmemSize(void)
{
    Size size;
    int i;
    int numLocks = NUM_LOCK_PARTITIONS;

    /* Space for the LWLock array. */
    size = mul_size(numLocks, sizeof(LWLockPadded));
    size = add_size(size, LWLOCK_PADDED_SIZE);

    return size;
}

static void
InitializeSecondBufferLWLocks(void)
{
    int id;
    int i;
    LWLockPadded *lock;

    for (id = 0, lock = SecondBufferMainLWLockArray; id < NUM_LOCK_PARTITIONS; id++, lock++)
        LWLockInitialize(&lock->lock, id);
}

void CreateSecondBufferLWLocks(void)
{
    if (!IsUnderPostmaster)
    {
        Size spaceLocks = SecondBufferLWLockShmemSize();
        char *ptr;

        /* Allocate space */
        ptr = (char *)ShmemAlloc(spaceLocks);

        /* Ensure desired alignment of LWLock array */
        ptr += LWLOCK_PADDED_SIZE - ((uintptr_t)ptr) % LWLOCK_PADDED_SIZE;

        SecondBufferMainLWLockArray = (LWLockPadded *)ptr;

        /* Initialize all LWLocks */
        InitializeSecondBufferLWLocks();
    }
}

void InitSecondBufferMeta(void)
{
    bool found, found1;
    int i;
    MultiKeyArrays = (SingleKeyArray *)
        ShmemInitStruct("multi page keys arrays",
                        sizeof(SingleKeyArray) * SDNUM,
                        &found);

    statisticnum = (Statisticnum *)
        ShmemInitStruct("statistic num",
                        sizeof(Statisticnum),
                        &found1);

    if (MultiKeyArrays == NULL)
    {
        ereport(PANIC, (errmsg("init secondbuffer meta fail")));
    }
    statisticnum->totalunused = SDLEN * SDNUM;
    SpinLockInit(&statisticnum->change);

    for (i = 0; i < SDNUM; i++)
    {
        SpinLockInit(&MultiKeyArrays[i].oplock);
        MultiKeyArrays[i].head = MultiKeyArrays[i].tail = 0;
        MultiKeyArrays[i].unused = SDLEN;
    }
}

void InitDPageKeyArray(void)
{
    // ereport(LOG, (errmsg("initdp")));
    bool found;
    int i;
    DPArray = (DPageKeyArray *)
        ShmemInitStruct("deleted page keys arrays",
                        sizeof(DPageKeyArray),
                        &found);
    // ereport(LOG, (errmsg("initdp doing")));
    if (DPArray == NULL)
    {
        ereport(PANIC, (errmsg("init DPArray fail")));
    }

    SpinLockInit(&DPArray->append);
    DPArray->head = DPArray->tail = DPArray->pageIndex = DPArray->walIndex = 0;
    DPArray->unused = 1024;
    // ereport(LOG, (errmsg("initdp done")));
}
/*
 init SecondBufferHash
 */
void InitSecondBufferHash(void)
{
    HASHCTL info;
    long init_table_size,
        max_table_size;
    bool found;

    /*
     * Compute init/max size to request for lock hashtables.  Note these
     * calculations must agree with SecondBufferhashShmemSize!
     */
    max_table_size = 200;
    init_table_size = max_table_size / 2;

    info.keysize = sizeof(SdPageKey);
    info.entrysize = sizeof(SdPageValue);

    info.num_partitions = NUM_LOCK_PARTITIONS;

    SecondBufferHash = ShmemInitHash("SecondBuffer hash",
                                     init_table_size,
                                     max_table_size,
                                     &info,
                                     HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
}

void InitPageDBEnv()
{
    if (!IsDirExist(lmdb_page_directory))
    {
        pg_mkdir_p(lmdb_page_directory, 0777);
    }
    mdb_env_create(&pageEnv);
    mdb_env_set_maxreaders(pageEnv, MAXREADERS);
    mdb_env_set_mapsize(pageEnv, MAPSIE);
    mdb_env_open(pageEnv, lmdb_page_directory, MDB_FIXEDMAP | MDB_NOSYNC, 0664);
    mdb_txn_begin(pageEnv, NULL, 0, &pageTxn);
    mdb_dbi_open(pageTxn, NULL, MDB_CREATE, &pageDbi);
    mdb_txn_commit(pageTxn);
}

void InitWalDBEnv()
{
    if (!IsDirExist(lmdb_wal_directory))
    {
        pg_mkdir_p(lmdb_wal_directory, 0777);
    }
    mdb_env_create(&walEnv);
    mdb_env_set_maxreaders(walEnv, MAXREADERS);
    mdb_env_set_mapsize(walEnv, MAPSIE);
    mdb_env_open(walEnv, lmdb_wal_directory, MDB_FIXEDMAP | MDB_NOSYNC, 0664);
    mdb_txn_begin(walEnv, NULL, 0, &walTxn);
    mdb_dbi_open(walTxn, NULL, MDB_CREATE | MDB_DUPSORT, &walDbi);
    mdb_txn_commit(walTxn);
}
void ClosePageDBEnv()
{
    mdb_dbi_close(pageEnv, pageDbi);
    mdb_env_close(pageEnv);
    ereport(LOG, errmsg("close page success"));
}

void CloseWalDBEnv()
{
    mdb_dbi_close(walEnv, walDbi);
    mdb_env_close(walEnv);
    ereport(LOG, errmsg("close wal success"));
}

static void
convertKey(SdPageKey *sdkey, PageKey *pk)
{
    sdkey->dbid = pk->relfileNode.dbNode;
    sdkey->relid = pk->relfileNode.relNode;
    sdkey->forkno = pk->forkNo;
    sdkey->blkno = pk->blkNo;
}

static void
convertKeyLd(LdPageKey *ldkey, PageKey *pk)
{
    SdPageKey sdkey;
    sdkey.dbid = pk->relfileNode.dbNode;
    sdkey.relid = pk->relfileNode.relNode;
    sdkey.forkno = pk->forkNo;
    sdkey.blkno = pk->blkNo;

    ldkey->sk = sdkey;
}

/*
 * notification_match: match function to use with notification_hash
 */
static int
secondbuffer_match(const void *key1, const void *key2, Size keysize)
{
    const SdPageKey *k1 = (const SdPageKey *)key1;
    const SdPageKey *k2 = (const SdPageKey *)key2;

    Assert(keysize == sizeof(SdPageKey));
    if (k1->dbid == k2->dbid &&
        k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid)
        return 0; /* equal */
    return 1;     /* not equal */
}

static uint32
SecondBufferHashCode(const SdPageKey *pk)
{
    return get_hash_value(SecondBufferHash, (const void *)pk);
}

static SdPageValue *
SetupSecondBufferInTable(const SdPageKey *pk)
{

    SdPageValue *pv;
    bool found;

    pv = (SdPageValue *)
        hash_search(SecondBufferHash, pk, HASH_ENTER_NULL, &found);

    return pv;
}

static bool
CleanUpSecondBuffer(const SdPageKey *pk)
{

    // LWLock	   *partitionLock;
    // uint32		newHash;
    // newHash = SecondBufferHashCode(pk);
    // partitionLock = SecondBufferMappingPartitionLock(newHash);
    // LWLockAcquire(partitionLock, LW_EXCLUSIVE);
    bool found;
    hash_search(SecondBufferHash,
                (void *)pk,
                HASH_REMOVE,
                &found);
    // LWLockRelease(partitionLock);
    return found;
}

static SdPageValue *
FindSecondBufferInTable(const SdPageKey *pk)
{
    SdPageValue *pv;
    bool found;
    if (SecondBufferHash == NULL)
    {
        return NULL;
    }
    pv = (SdPageValue *)
        hash_search(SecondBufferHash,
                    pk,
                    HASH_FIND,
                    &found);
    if (!found)
    {
        return NULL;
    }
    return pv;
}

//  sb -> ssb
void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
{
    SdPageKey *sk;
    sk = (SdPageKey *)malloc(sizeof(SdPageKey));
    convertKey(sk, pk);

    LWLock *partitionLock;
    uint32 newHash;
    newHash = SecondBufferHashCode(sk);
    partitionLock = SecondBufferMappingPartitionLock(newHash);
    SdPageValue *sdPageValue = NULL;
    while (sdPageValue == NULL)
    {
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
        sdPageValue = SetupSecondBufferInTable(sk);
        if (sdPageValue == NULL)
        {
            LWLockRelease(partitionLock);
            continue;
        }
        sdPageValue->canDelete = false;
        memcpy(sdPageValue->pagecontent, buffer, 8192);
        LWLockRelease(partitionLock);
    }

    srand((int)clock());
    int index = rand() % SDNUM;
    SingleKeyArray *sa;

    for (;;)
    {
        sa = &MultiKeyArrays[index];
        SpinLockAcquire(&sa->oplock);
        if (sa->unused > 0)
        {
            sa->SdPageKeyList[sa->tail] = *sk;
            sa->tail = (sa->tail + 1) % SDLEN;
            sa->unused--;
            SpinLockAcquire(&statisticnum->change);
            statisticnum->totalunused--;
            SpinLockRelease(&statisticnum->change);
            SpinLockRelease(&sa->oplock);
            break;
        }
        else
        {
            SpinLockRelease(&sa->oplock);
            index = (index + 1) % SDNUM;
        }
    }
    free(sk);
}

//  ssb -> sb
static uint8_t *
GetPageFromSecondBuffer(PageKey *pk, uint8_t *buffer)
{
    SdPageKey *sk = NULL;
    sk = (SdPageKey *)malloc(sizeof(SdPageKey));
    convertKey(sk, pk);

    LWLock *partitionLock;
    uint32 newHash;
    newHash = SecondBufferHashCode(sk);
    partitionLock = SecondBufferMappingPartitionLock(newHash);
    LWLockAcquire(partitionLock, LW_SHARED);

    SdPageValue *sv = FindSecondBufferInTable(sk);
    if (sv == NULL)
    {
        if (sk != NULL)
        {
            free(sk);
        }
        LWLockRelease(partitionLock);
        return NULL;
    }
    else
    {
        free(sk);
        memcpy(buffer, sv->pagecontent, BLCKSZ);
        LWLockRelease(partitionLock);
        return buffer;
    }
}

// lc -> sb
static uint8_t *
GetPageFromLocalBuffer(PageKey *pk, uint8_t *buffer)
{
    int i = 0;
    if (pageEnv == NULL)
    {
        return NULL;
    }

    MDB_txn *tmptxn;
    LdPageKey *lk;
    lk = (LdPageKey *)malloc(sizeof(LdPageKey));
    convertKeyLd(lk, pk);
    MDB_val key, data;
    data.mv_data = NULL;
    data.mv_size = 0;
    key.mv_size = sizeof(LdPageKey);
    key.mv_data = lk;
    mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
    mdb_get(tmptxn, pageDbi, &key, &data);
    if (data.mv_data != NULL)
    {
        mdb_txn_abort(tmptxn);
        free(lk);
        memcpy(buffer, data.mv_data, data.mv_size);
        return data.mv_data;
    }
    else
    {
        free(lk);
        mdb_txn_abort(tmptxn);
        return NULL;
    }
}

uint64_t
SwapLsnFromLittleToBig(uint64_t lsn)
{
#ifndef WORDS_BIGENDIAN
    /*  trans lsn from little endian to big endian in memory
     *  eg: 0x12345678 ===> 0x78563412
     */

    uint32 low, high;
    low = (uint32)(lsn);
    high = (uint32)((lsn) >> 32);

    low = (low << 16) | (low >> 16);
    low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF);

    high = (high << 16) | (high >> 16);
    high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF);
    return ((uint64)(low)) << 32 | (uint64)(high);
#endif
    return lsn;
}

uint64_t
SwapLsnFromBigToLittle(uint64_t lsn)
{
#ifndef WORDS_BIGENDIAN
    /*  trans lsn from big endian to little endian in memory
     *  eg: 0x78563412 ===> 0x12345678
     */

    uint32 low, high;
    low = (uint32)(lsn);
    high = (uint32)((lsn) >> 32);

    low = (low << 16) | (low >> 16);
    low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF);

    high = (high << 16) | (high >> 16);
    high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF);
    return ((uint64)(low)) << 32 | (uint64)(high);
#endif
    return lsn;
}

Bufrd GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
{
    MDB_txn *tmptxn;
    MDB_cursor *tmpcursor;

    Bufrd bufrd;
    MDB_val key, data;
    int tb = -1, co = -1, cg = -1;

    data.mv_size = 0;
    data.mv_data = NULL;

    key.mv_size = SizeOfCleanWal;
    key.mv_data = wpk;
    int waldatalen = 0, roomlen = 2048;
    uint32 dbid, relid, forkno, blkno;
    dbid = wpk->sk.dbid;
    relid = wpk->sk.relid;
    forkno = wpk->sk.forkno;
    blkno = wpk->sk.blkno;

    bufrd.buf = NULL;
    bufrd.cap = 0;
    bufrd.count = 0;

    uint8_t *waldata = (uint8_t *)malloc(roomlen);
    tb = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
    if (tb != 0)
    {
        // TODO
        ereport(PANIC, errmsg("mdb_txn_begin failed,error is:%d", tb));
    }
    co = mdb_cursor_open(tmptxn, walDbi, &tmpcursor);
    if (co != 0)
    {
        // TODO
        ereport(PANIC, errmsg("mdb_txn_open failed,error is:%d", co));
    }
    // ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
    //                 wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
    if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0)
    {
        ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
                            cg, dbid, relid, forkno, blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
        bufrd.buf = waldata;
        mdb_cursor_close(tmpcursor);
        mdb_txn_abort(tmptxn);
        return bufrd;
    }
    wpk = (WalLdPageKey *)key.mv_data;

    // ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
    //                 wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
    while (wpk->sk.dbid == dbid && wpk->sk.relid == relid && wpk->sk.forkno == forkno && wpk->sk.blkno == blkno && SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn)
    {
        memcpy(waldata + waldatalen, data.mv_data, data.mv_size);
        waldatalen += data.mv_size;
        if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT))
        {
            bufrd.buf = waldata;
            bufrd.cap = roomlen;
            bufrd.count = waldatalen;
            mdb_cursor_close(tmpcursor);
            mdb_txn_abort(tmptxn);
            return bufrd;
        }
        else
        {
            if (waldatalen + data.mv_size > roomlen)
            {
                roomlen += 1024;
                waldata = (uint8_t *)realloc(waldata, roomlen);
            }

            wpk = (WalLdPageKey *)key.mv_data;
            // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
            //         wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
        }
    }
    bufrd.buf = waldata;
    bufrd.count = waldatalen;
    bufrd.cap = roomlen;

    mdb_cursor_close(tmpcursor);
    mdb_txn_abort(tmptxn);
    tmpcursor = NULL;
    tmptxn = NULL;
    return bufrd;
}

void AddOneItemToDPArray(OriginDPageKey odpk)
{
    // ereport(LOG, (errmsg("AddOneItemToDPArray in processing")));
    // DPageKey dpk;
    // dpk.pk = odpk.pk;
    // dpk.operation = odpk.opration;
    // dpk.pagedeleted = false;

    // while(1)
    // {
    //     SpinLockAcquire(&DPArray->append);
    //     if (DPArray->unused > 0)
    //     {
    //         DPArray->dpk[DPArray->tail] = dpk;
    //         DPArray->tail ++;
    //         DPArray->unused --;
    //         SpinLockRelease(&DPArray->append);
    //         break;
    //     }
    //     SpinLockRelease(&DPArray->append);
    //     pg_usleep(1);

    // }
    // ereport(LOG, (errmsg("AddOneItemToDPArray done")));
}

void storeWalInLocalBuffer(kvStruct *ks, int32 length)
{
    // pthread_mutex_lock(&q_lock);
    int tb = -1, co = -1, cp = -1, cc = -1, tc = -1;
    MDB_txn *tmptxn = NULL;
    MDB_val key, data;
    MDB_cursor *tmpcursor = NULL;

    uint8_t *xlogContent = NULL;
    uint8_t part = 0;
    uint32_t totallen = 0;
    WalLdPageKey wlpk;

    tb = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
    if (tb != 0)
    {
        // TODO
        ereport(LOG, errmsg("put mdb_txn_begin failed,error is:%d", tb));
    }

    co = mdb_cursor_open(tmptxn, walDbi, &tmpcursor);
    if (co != 0)
    {
        // TODO
        ereport(LOG, errmsg("put mdb_txn_open failed,error is:%d", co));
    }

    for (int i = 0; i < length; i++)
    {
        part = 0;
        uint8_t *buf = ks[i].buf;
        totallen = (uint32_t)buf[0] | (uint32_t)(buf[1] << 8) | (uint32_t)(buf[2] << 16) | (uint32_t)(buf[3] << 24);

        key.mv_size = SizeOfCleanWal;
        wlpk.sk = ks[i].lpk.sk;

        if (totallen > 511)
        {
            while (totallen > 0)
            {
                if (totallen > 511)
                {
                    data.mv_size = 511;
                    data.mv_data = NULL;
                    xlogContent = (uint8_t *)malloc(511);
                    wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
                    wlpk.partition = part;

                    memcpy(xlogContent, buf + (part * 511), 511); // 502 = 511 - 9
                    part++;
                    totallen -= 511;
                    key.mv_data = &wlpk;
                    data.mv_data = xlogContent;
                    cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA);
                    if (cp != 0)
                    {
                        ereport(LOG, errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
                                            cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
                        if (cp == MDB_KEYEXIST)
                        {
                            break;
                        }
                    }
                    free(xlogContent);
                    xlogContent = NULL;
                }
                else
                {
                    data.mv_size = totallen;
                    data.mv_data = NULL;
                    xlogContent = (uint8_t *)malloc(totallen);
                    wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
                    wlpk.partition = part;
                    key.mv_data = &wlpk;
                    memcpy(xlogContent, buf + (part * 511), totallen);
                    data.mv_data = xlogContent;
                    cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA);
                    if (cp != 0)
                    {
                        ereport(LOG, errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d",
                                            cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
                        if (cp == MDB_KEYEXIST)
                        {
                            break;
                        }
                    }
                    free(xlogContent);
                    xlogContent = NULL;
                    break;
                }
            }
        }
        else
        {
            data.mv_size = totallen;
            data.mv_data = NULL;
            xlogContent = (uint8_t *)malloc(totallen);
            wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
            wlpk.partition = 0;
            key.mv_data = &wlpk;
            memcpy(xlogContent, buf, totallen);
            data.mv_data = xlogContent;
            cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA);
            if (cp != 0)
            {
                ereport(LOG, errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
                                    cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno,
                                    SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
                if (cp == MDB_KEYEXIST)
                {
                    cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET);
                    if (cp != 0)
                        ereport(LOG, errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d",
                                            cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
                    continue;
                }
            }
            free(xlogContent);
            xlogContent = NULL;
        }
    }
    mdb_cursor_close(tmpcursor);

    co = mdb_txn_commit(tmptxn);
    if (co != 0)
        ereport(LOG, errmsg("put mdb_txn_commit failed,error is:%d", co));
    tmpcursor = NULL;
    tmptxn = NULL;
    // pthread_mutex_unlock(&q_lock);
}

void GetPageFromCurrentNode(PageKey pk, Bufrd *bufrd)
{
    uint8_t *page;
    page = NULL;

    if (bufrd->buf == NULL)
    {
        bufrd->buf = (uint8_t *)malloc(BLKSZ);
    }
    page = GetPageFromSecondBuffer(&pk, bufrd->buf);
    if (page == NULL)
    {
        page = GetPageFromLocalBuffer(&pk, bufrd->buf);
    }

    if (page == NULL)
    {
        bufrd->buf = NULL;
        bufrd->cap = 0;
        bufrd->count = 0;
    }
    else
    {
        if (*isPromoteIsTriggered || EnableHotStandby == false || push_standby)
        {
            bufrd->count = 8192;
            bufrd->cap = 8192;
        }
        else
        {
            WalLdPageKey wlpk;
            wlpk.sk.dbid = pk.relfileNode.dbNode;
            wlpk.sk.relid = pk.relfileNode.relNode;
            wlpk.sk.forkno = pk.forkNo;
            wlpk.sk.blkno = pk.blkNo;
            wlpk.pageLsn = SwapLsnFromLittleToBig(pk.pageLsn);
            wlpk.partition = 0;
            Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn);
            if (waldata.count > 0)
            {
                bufrd->buf = (uint8_t *)realloc(bufrd->buf, 8192 + waldata.count);
                memcpy(bufrd->buf + 8192, waldata.buf, waldata.count);
                wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn);
                SendInvalWal(&wlpk);

                wlpk.sk.dbid = 0;
                wlpk.sk.relid = 0;
                wlpk.sk.forkno = 32;
                wlpk.sk.blkno = 0;
                wlpk.pageLsn = 0;
                wlpk.partition = 0;
                SendInvalWal(&wlpk);
            }
            bufrd->cap = bufrd->count = 8192 + waldata.count;
            free(waldata.buf);
        }
    }
}

// void SendInvalWal(WalLdPageKey *walkey) {
//     int     sock_fd;
//     struct  sockaddr_un un;
//     un.sun_family = AF_UNIX;
//     strcpy(un.sun_path, socketfile);
//     sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
//     if (sock_fd < 0)
//     {
//         elog(WARNING, "request socket failed");
//         return;
//     }

//     if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
//     {
//         elog(WARNING, "connect socket failed");
//         return;
//     }
//     send(sock_fd, walkey, SizeOfCleanWal, 0);
//     close(sock_fd);
//     return;
// }

void SendInvalWal(WalLdPageKey *walkey)
{
    struct sockaddr_un un;
    if (SocFd.walSocketFd < 0)
    {
        un.sun_family = AF_UNIX;
        strcpy(un.sun_path, socketfile);
        SocFd.walSocketFd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (SocFd.walSocketFd < 0)
        {
            elog(WARNING, "request socket failed");
            return;
        }

        if (connect(SocFd.walSocketFd, (struct sockaddr *)&un, sizeof(un)) < 0)
        {
            elog(WARNING, "connect socket failed");
            return;
        }
    }

    send(SocFd.walSocketFd, walkey, SizeOfCleanWal, 0);
    //    close(sock_fd);
    return;
}

void SendInvalPage(LdPageKey *ldKey)
{
    struct sockaddr_un un;
    if (SocFd.pageSocketFd < 0)
    {
        un.sun_family = AF_UNIX;
        strcpy(un.sun_path, p_socketfile);
        SocFd.pageSocketFd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (SocFd.pageSocketFd < 0)
        {
            elog(WARNING, "request socket failed");
            return;
        }

        if (connect(SocFd.pageSocketFd, (struct sockaddr *)&un, sizeof(un)) < 0)
        {
            elog(WARNING, "connect socket failed");
            return;
        }
    }

    send(SocFd.pageSocketFd, ldKey, SizeOfCleanPage, 0);
    return;
}
void *doCleanWalInLmdb(void *fd)
{
    int syncFlag;
    int new_fd = *(int *)fd;
    static char data_buf[SizeOfCleanWal];
    while (1)
    {

        memset(data_buf, 0, SizeOfCleanWal);
        recv(new_fd, data_buf, SizeOfCleanWal, 0);
        WalLdPageKey *wpk = (WalLdPageKey *)data_buf;

        if (0 == wpk->partition && 0 == wpk->pageLsn&&
            0 == wpk->sk.blkno && 0 == wpk->sk.dbid &&
            32 == wpk->sk.forkno && 0 == wpk->sk.relid)
        {
            syncFlag = mdb_env_sync(walEnv, 1);
            if (syncFlag != 0)
            {
                printf("wal mdb_env_sync is failed, errcode is :%d\n",syncFlag);
            }
        }
        else if (0 == wpk->partition && 0 == wpk->pageLsn &&
                 0 == wpk->sk.blkno && 0 == wpk->sk.dbid && 
                 0 == wpk->sk.forkno && 0 == wpk->sk.relid)
        {
            close(new_fd);
            break;
        }
        else
        {
            if (wpk->partition == 0)
            {
                CleanWalsByPage(wpk);
            }
            else
            {
                CleanWalsByTable(wpk);
            }
        }
    }
    mdb_env_sync(walEnv, 1);
}

void *CleanWalsInLmdb(void *arg)
{
    int fd = -1, new_fd;
    struct sockaddr_un un;
    static char data_buf[SizeOfCleanWal];
    if (fd < 0)
    {
        fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0)
        {
            elog(PANIC, "request cleanwal socket failed");
        }
    }
    un.sun_family = AF_UNIX;
    unlink(socketfile);
    strcpy(un.sun_path, socketfile);

    if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
    {
        elog(PANIC, "bind cleanwal socket failed");
    }
    if (listen(fd, MaxBackends + 8) < 0)
    {
        elog(PANIC, "listen cleanwal socket failed");
    }

    while (1)
    {
        pthread_t p;
        new_fd = accept(fd, NULL, NULL);
        if (new_fd < 0)
        {
            close(fd);
            unlink(socketfile);
            elog(PANIC, "cannot accept client connect request");
        }

        pthread_create(&p, NULL, doCleanWalInLmdb, &new_fd);
    }
}

void *doCleanPageInLmdb(void *fd)
{
    int new_fd = *(int *)fd;
    int syncFlag;
    static char data_buf[SizeOfCleanPage];
    while (1)
    {
        memset(data_buf, 0, SizeOfCleanPage);
        recv(new_fd, data_buf, SizeOfCleanPage, 0);
        LdPageKey *lpk = (LdPageKey *)data_buf;
        if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 32 == lpk->sk.forkno && 0 == lpk->sk.relid)
        {
            // close(new_fd);
            // break;
            syncFlag = mdb_env_sync(pageEnv, 1);
            if (syncFlag != 0)
            {
                printf("page mdb_env_sync is failed,errcode is: %d\n",syncFlag);
            }

        }
        else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid)
        {
            close(new_fd);
            break;
        }
        else
        {
            CleanPagesByTable(lpk);
        }
    }
}
void *CleanPagesInLmdb(void *arg)
{
    int fd = -1, new_fd;
    struct sockaddr_un un;
    static char data_buf[SizeOfCleanPage];
    if (fd < 0)
    {
        fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0)
        {
            elog(PANIC, "request cleanwal socket failed");
        }
    }
    un.sun_family = AF_UNIX;
    unlink(p_socketfile);
    strcpy(un.sun_path, p_socketfile);

    if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
    {
        elog(PANIC, "bind cleanwal socket failed");
    }
    if (listen(fd, MaxBackends + 8) < 0)
    {
        elog(PANIC, "listen cleanwal socket failed");
    }

    while (1)
    {
        pthread_t p;
        new_fd = accept(fd, NULL, NULL);
        if (new_fd < 0)
        {
            close(fd);
            unlink(p_socketfile);
            elog(PANIC, "cannot accept client connect request");
        }

        pthread_create(&p, NULL, doCleanPageInLmdb, &new_fd);
    }
}

static int
IsDirExist(const char *path)
{
    return !access(path, F_OK);
}

static void
CleanWalsByPage(WalLdPageKey *walkey)
{
    MDB_txn *tmptxn;
    MDB_cursor *tmpcursor;
    MDB_val key, data;
    int success = -1;
    uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);

    walkey->pageLsn = 0;
    key.mv_size = SizeOfCleanWal;
    key.mv_data = walkey;

    data.mv_size = 0;
    data.mv_data = NULL;

    uint32 dbid, relid, forkno, blkno;
    dbid = walkey->sk.dbid;
    relid = walkey->sk.relid;
    forkno = walkey->sk.forkno;
    blkno = walkey->sk.blkno;

    success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
    if (success != 0)
    {
        elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
        return;
    }

    success = mdb_cursor_open(tmptxn, walDbi, &tmpcursor);
    if (success != 0)
    {
        elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
        return;
    }

    success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE);

    if (success != 0)
    {
        ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
                            success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
        mdb_cursor_close(tmpcursor);
        mdb_txn_abort(tmptxn);
        return;
    }

    walkey = (WalLdPageKey *)key.mv_data;
    // elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
    //             walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);

    while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno && walkey->sk.blkno == blkno && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
    {
        // elog(LOG, "del wal  rel %d, fork %d, blk %d, lsn %ld, part %d",
        //         walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
        success = mdb_cursor_del(tmpcursor, 0);
        if (success != 0)
            elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
                 success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
        if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT))
        {
            break;
        }

        walkey = (WalLdPageKey *)key.mv_data;
        // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
        //         wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
    }

    mdb_cursor_close(tmpcursor);
    mdb_txn_commit(tmptxn);
    tmpcursor = NULL;
    tmptxn = NULL;
    return;
}

static void
CleanWalsByTable(WalLdPageKey *walkey)
{
    MDB_txn *tmptxn;
    MDB_cursor *tmpcursor;
    MDB_val key, data;
    int success = -1;
    uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);

    walkey->pageLsn = 0;
    walkey->partition = 0;
    key.mv_size = SizeOfCleanWal;
    key.mv_data = walkey;

    data.mv_size = 0;
    data.mv_data = NULL;

    uint32 dbid, relid, forkno, blkno;
    dbid = walkey->sk.dbid;
    relid = walkey->sk.relid;
    forkno = walkey->sk.forkno;
    blkno = walkey->sk.blkno;

    success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
    if (success != 0)
    {
        elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
        return;
    }

    success = mdb_cursor_open(tmptxn, walDbi, &tmpcursor);
    if (success != 0)
    {
        elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
        return;
    }

    success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE);

    if (success != 0)
    {
        ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
                            success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
        mdb_cursor_close(tmpcursor);
        mdb_txn_abort(tmptxn);
        return;
    }

    walkey = (WalLdPageKey *)key.mv_data;
    // elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
    //             walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);

    while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno
           //           && walkey->sk.blkno == blkno
           && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
    {
        // elog(LOG, "del wal  rel %d, fork %d, blk %d, lsn %ld, part %d",
        //         walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
        success = mdb_cursor_del(tmpcursor, 0);
        if (success != 0)
            elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
                 success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
        if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT))
        {
            break;
        }

        walkey = (WalLdPageKey *)key.mv_data;
        // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
        //         wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
    }

    mdb_cursor_close(tmpcursor);
    mdb_txn_commit(tmptxn);
    tmpcursor = NULL;
    tmptxn = NULL;
    return;
}

static void
CleanPagesByTable(LdPageKey *ldKey)
{
    MDB_txn *tmptxn;
    MDB_cursor *tmpcursor;
    MDB_val key, data;
    int success = -1;

    key.mv_size = SizeOfCleanPage;
    key.mv_data = ldKey;

    data.mv_size = 0;
    data.mv_data = NULL;

    uint32 dbid, relid, forkno, blkno;
    dbid = ldKey->sk.dbid;
    relid = ldKey->sk.relid;
    forkno = ldKey->sk.forkno;
    blkno = ldKey->sk.blkno;
    success = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
    //        mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
    if (success != 0)
    {
        elog(LOG, "mdb_txn_begin failed when clean pages, err %d", success);
        return;
    }
    success = mdb_cursor_open(tmptxn, pageDbi, &tmpcursor);
    if (success != 0)
    {
        elog(LOG, "mdb_cursor_open failed when clean pages, err %d", success);
        return;
    }

    success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE);
    if (success != 0)
    {
        elog(LOG, "mdb_cursor_get failed when clean pages, err %d, rel %d, fork %d, blk %d",
             success, relid, forkno, blkno);
        mdb_cursor_close(tmpcursor);
        mdb_txn_abort(tmptxn);
        return;
    }

    ldKey = (LdPageKey *)key.mv_data;
    //  elog(LOG, "get page db %d, rel %d, fork %d, blk %d",
    // 	ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);

    while (ldKey->sk.dbid == dbid && ldKey->sk.relid == relid && ldKey->sk.forkno == forkno)
    {
        elog(LOG, "del page  dbid %d, rel %d, fork %d, blk %d",
             ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
        success = mdb_cursor_del(tmpcursor, 0);
        if (success != 0)
            elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d",
                 success, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
        if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT))
        {
            break;
        }

        ldKey = (LdPageKey *)key.mv_data;
        // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
        //         wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
    }

    mdb_cursor_close(tmpcursor);
    mdb_txn_commit(tmptxn);
    tmpcursor = NULL;
    tmptxn = NULL;
    return;
}

static void *
RemovePageOrWalFromCurrentNode()
{
    //    ereport(INFO, (errmsg("RemovePageOrWalFromCurrentNode in processing")));
    MDB_txn *tmptxn;
    MDB_cursor *tmpcursor;
    MDB_val key, data;
    LdPageKey *lpk = NULL;
    PageKey *pk = NULL;
    SdPageValue *spv = NULL;
    int success = 1;
    int PageOrWal = 1;

    lpk = (LdPageKey *)malloc(sizeof(LdPageKey));

    LWLock *partitionLock = NULL;
    uint32 newHash;

    for (;;)
    {
        if (PageOrWal == (int)PAGE)
        {
            if (DPArray->pageIndex >= DPArray->tail)
            {
                continue;
            }
            pk = &DPArray->dpk[DPArray->pageIndex].pk;
        }
        else
        {
            if (DPArray->walIndex >= DPArray->pageIndex)
            {
                continue;
            }
            if (DPArray->dpk[DPArray->walIndex].pagedeleted = false)
            {
                continue;
            }
            pk = &DPArray->dpk[DPArray->walIndex].pk;
        }

        convertKeyLd(lpk, pk);
        key.mv_size = sizeof(LdPageKey);
        key.mv_data = lpk;

        if (PageOrWal == 1)
        {
            mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
            mdb_cursor_open(tmptxn, pageDbi, &tmpcursor);
        }
        else
        {
            mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
            mdb_cursor_open(tmptxn, walDbi, &tmpcursor);
        }

        success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET);

        if (success == 0)
        {
            mdb_cursor_del(tmpcursor, 0);
        }
        else
        {
            // DROP
            mdb_cursor_get(tmpcursor, &key, &data, MDB_PREV);
            mdb_cursor_del(tmpcursor, 0);
            if (PageOrWal == (int)PAGE)
            {
                // CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
                newHash = SecondBufferHashCode(&((SdPageValue *)data.mv_data)->pk);
                partitionLock = SecondBufferMappingPartitionLock(newHash);
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
                CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
                LWLockRelease(partitionLock);
            }
        }

        if (PageOrWal == (int)PAGE && success == 0)
        {
            // TRUNCATE
            if (DPArray->dpk[DPArray->walIndex].operation == (int)TRUNCATE)
            {
                // CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
                newHash = SecondBufferHashCode(&((SdPageValue *)data.mv_data)->pk);
                partitionLock = SecondBufferMappingPartitionLock(newHash);
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
                CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
                LWLockRelease(partitionLock);
            }
            else if (NULL == FindSecondBufferInTable(&lpk->sk)) // EVICT
            {
                mdb_put(tmptxn, pageDbi, &key, &data, MDB_NODUPDATA);
            }
        }

        mdb_cursor_close(tmpcursor);
        mdb_txn_commit(tmptxn);
        if (PageOrWal == (int)PAGE)
        {
            DPArray->dpk[DPArray->pageIndex].pagedeleted = true;
            DPArray->pageIndex = (DPArray->pageIndex + 1) % 1024;

            if (EnableHotStandby == false || *isPromoteIsTriggered)
            {
                DPArray->head = (DPArray->head + 1) % 1024;
                SpinLockAcquire(&DPArray->append);
                DPArray->unused++;
                SpinLockRelease(&DPArray->append);
            }
        }
        else
        {
            DPArray->head++;
            DPArray->walIndex = (DPArray->walIndex + 1) % 1024;
            DPArray->head = (DPArray->head + 1) % 1024;
            SpinLockAcquire(&DPArray->append);
            DPArray->unused++;
            SpinLockRelease(&DPArray->append);
        }
    }
}

static void
MovePageFromSecondBufferToLocalBuffer()
{
    printf("MovePageFromSecondBufferToLocalBuffer\n");
    MDB_txn *tmptxn;
    MDB_txn *txn = NULL;
    MDB_dbi dbi;
    MDB_val key, data;

    SingleKeyArray *ska = NULL;
    int localHead = 0;
    int localTail = 0;
    int localUnused = 0;
    int processNum = 0;
    int i = 0;
    int j = 0;
    SdPageKey spk;
    LdPageKey lpk;

    SdPageKeyEntity *spke = NULL;
    SdPageKeyList spkl;
    spkl.head = NULL;
    spkl.tail = NULL;

    SdPageValue *spv = NULL;
    int tb = -1, mp = -1, mtc = -1, mdo = -1;
    long sleeptime = 1000L;
    bool success;

    LWLock *partitionLock = NULL;
    uint32 newHash;
    bool canShutDown = false;
    pid_t ckp_pid;

    for (;;)
    {
        // exit when postmaster stop
        ResetLatch(MyLatch);
        if (ShutdownRequestPending)
        {
            if (canShutDown)
                proc_exit(0);

            ckp_pid = He3DBQueryCkpPid();
            if (ckp_pid == 0)
                canShutDown = true;

            if (kill(ckp_pid, 0) == -1)
            {
                if (errno == ESRCH)
                {
                    elog(LOG, "checkpoint process is shutdown, we can shutdown secondbuffer process after flush all buffer into lmdb");
                    canShutDown = true;
                }
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_INTERNAL_ERROR),
                             errmsg("could not check the existence of the backend with PID %d: %m",
                                    ckp_pid)));
            }
        }

        for (i = 0; i < SDNUM; i++)
        {
            ska = &MultiKeyArrays[i];
            SpinLockAcquire(&ska->oplock);
            localHead = ska->head;
            localTail = ska->tail;
            localUnused = ska->unused;
            SpinLockRelease(&ska->oplock);

            if (localUnused == 0)           //
            {
                processNum = SDLEN;
            }
            else if (localUnused == SDLEN)
            {
                continue;
            }
            else
            {
                processNum = (localTail + SDLEN - localHead) % SDLEN;
            }
            success = true;
            tb = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
            if (tb != 0)
            {
                ereport(LOG, errmsg("mdb_txn_begin failed,error code is %d", tb));
                continue;
            }

            for (j = 0; j < processNum; j++)
            {
                spk = ska->SdPageKeyList[(localHead + j) % SDLEN];
                spke = (SdPageKeyEntity *)malloc(sizeof(SdPageKeyEntity));
                spke->spk = spk;
                spke->next = NULL;

                if (spkl.head == NULL)
                {
                    spkl.head = spke;
                    spkl.tail = spkl.head;
                }
                else
                {
                    spkl.tail->next = spke;
                    spkl.tail = spkl.tail->next;
                }

                newHash = SecondBufferHashCode(&spk);
                partitionLock = SecondBufferMappingPartitionLock(newHash);
                LWLockAcquire(partitionLock, LW_SHARED);
                spv = FindSecondBufferInTable(&spk);
                if (spv == NULL || spv->pagecontent == NULL)
                {
                    LWLockRelease(partitionLock);
                    continue;
                }
                lpk.sk = spk;

                key.mv_size = sizeof(LdPageKey);
                key.mv_data = &lpk;

                data.mv_size = 8192;
                data.mv_data = spv->pagecontent;

                mp = mdb_put(tmptxn, pageDbi, &key, &data, 0);
                spv->canDelete = true;
                LWLockRelease(partitionLock);
                if (mp != 0)
                {
                    success = false;
                    ereport(LOG, errmsg("mdb_put failed, mp is %d", mp));
                    break;
                }
            }

            if (!success)
            {
                mdb_txn_abort(tmptxn);
            }
            else
            {
                mtc = mdb_txn_commit(tmptxn);
                if (mtc != 0)
                {
                    success = false;
                    ereport(LOG, errmsg("mdb_txn_commit failed,error is:%d", mtc));
                    mdb_txn_abort(tmptxn);
                }
            }

            SdPageKeyEntity *s = NULL;
            SdPageValue *spv = NULL;
            while (spkl.head != NULL)
            {
                s = spkl.head;
                if (success)
                {
                    spv = FindSecondBufferInTable(&s->spk);
                    if (spv != NULL)
                    {
                        newHash = SecondBufferHashCode(&s->spk);
                        partitionLock = SecondBufferMappingPartitionLock(newHash);
                        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
                        if (spv->canDelete)
                            CleanUpSecondBuffer(&s->spk);
                        LWLockRelease(partitionLock);
                    }
                }
                if (spkl.head->next != NULL)
                {
                    spkl.head = spkl.head->next;
                    free(s);
                }
                else
                {
                    free(spkl.head);
                    break;
                }
            }

            spkl.head = spkl.tail = NULL;

            if (!success)
            {
                continue;
            }

            ska->head = localTail;
            SpinLockAcquire(&ska->oplock);
            if (ska->unused == 0)
            {
                ska->unused = SDLEN;
                SpinLockAcquire(&statisticnum->change);
                statisticnum->totalunused += SDLEN;
                SpinLockRelease(&statisticnum->change);
            }
            else
            {
                ska->unused = ska->unused + (localTail + SDLEN - localHead) % SDLEN;
                SpinLockAcquire(&statisticnum->change);
                statisticnum->totalunused += ((localTail + SDLEN - localHead) % SDLEN);
                SpinLockRelease(&statisticnum->change);
            }

            SpinLockRelease(&ska->oplock);
        }

        double rate = statisticnum->totalunused / (SDLEN * SDNUM);
        if (rate < 0.1)
        {
            sleeptime = 0;
        }
        else if (rate > 0.6)
        {
            sleeptime += 1000L;
        }
        else if (rate < 0.5)
        {
            sleeptime = sleeptime / 2;
        }
        pg_usleep(sleeptime);

        (void)WaitLatch(MyLatch,
                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                        50L /* convert to ms */,
                        WAIT_EVENT_SECONDBUFFER_MAIN);
    }
}

void SignalStartSecondBuffer(void)
{
    SendPostmasterSignal(PMSIGNAL_SECONDBUFFER_WORKER);
}

void SecondBufferMain(void)
{

    MyBackendType = B_SECONDBUFFER;
    MemoryContext SecondBuffer_context;
    SecondBuffer_context = AllocSetContextCreate(TopMemoryContext,
                                                 "SecondBuffer",
                                                 ALLOCSET_DEFAULT_SIZES);
    MemoryContextSwitchTo(SecondBuffer_context);

    init_ps_display(NULL);

    SetProcessingMode(InitProcessing);

    pqsignal(SIGHUP, SIG_IGN);
    pqsignal(SIGINT, SIG_IGN);
    pqsignal(SIGTERM, SignalHandlerForShutdownRequest);

    pqsignal(SIGALRM, SIG_IGN);
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    pqsignal(SIGUSR2, SIG_IGN);

    pqsignal(SIGCHLD, SIG_DFL);

    /*
     * Unblock signals (they were blocked when the postmaster forked us)
     */
    PG_SETMASK(&UnBlockSig);

    pthread_t ntid;
    int err;
    err = pthread_create(&ntid, NULL, CleanPagesInLmdb, NULL);
    if (err != 0)
        elog(PANIC, "pthread_create CleanPagesInLmdb failed %s", strerror(err));
    MovePageFromSecondBufferToLocalBuffer();
}
